package org.apache.spark.deploy.master

import java.io.File

import com.digitalchina.dcn.dscc.portal.protocol.PortalMessage
import org.apache.spark.deploy.master.MasterMessages.{BoundPortsRequest, BoundPortsResponse, MasterResponse}
import org.apache.spark.deploy.worker.WorkerMessages.RegisterWorker
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.netty.NettyRpcEnv
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.util.{SparkEnv, Utils}
import org.apache.spark.{SecurityManager, SparkConf}

/**
  * Created by Administrator on 2016/8/19.
  */
class Master(override val rpcEnv: RpcEnv,
             address: RpcAddress,
            val securityMgr: SecurityManager,
            val conf: SparkConf)  extends ThreadSafeRpcEndpoint with Logging{

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case BoundPortsRequest =>
      context.reply(BoundPortsResponse(address.port, 23, Some(23)))
    case RegisterWorker(host, port) =>
      logInfo(s"register worker,  $host:$port" )
      context.reply(MasterResponse(true))
  }
}

object Master extends Logging{

  val hostname = Utils.localHostName()
  val port = 7077
  val SYSTEM_NAME = "sparkMaster"
  val ENDPOINT_NAME = "Master"

  def main(argStrings: Array[String]) {
    Utils.initDaemon(log)
    val conf = new SparkConf
    //    val args = new MasterArguments(argStrings, conf)
    val rpcEnv = startRpcEnvAndEndpoint(hostname, port, conf)

    val nettyRpcEnv = rpcEnv.asInstanceOf[NettyRpcEnv]
    nettyRpcEnv.startAuth("192.168.3.38", "steve", "test")
    Thread.sleep(4000);

    println("start second auth")
    nettyRpcEnv.startAuth("192.168.3.38", "steve", "test")
    Thread.sleep(20000)
    nettyRpcEnv.startLogout("192.168.3.38")

    SparkEnv.set(new SparkEnv(rpcEnv))
//    rpcEnv.fileServer.addFile(new File("d:/hbase-1.1.5-bin.tar.gz"));
//    val url = s"spark://$hostname:$port/files/hbase-1.1.5-bin.tar.gz"
//    Utils.fetchFile(url, new File("F:/"), conf, new SecurityManager(conf), System.currentTimeMillis(), true)
    rpcEnv.awaitTermination()
  }

  /**
    * Start the Master and return a three tuple of:
    *   (1) The Master RpcEnv
    *   (2) The web UI bound port
    *   (3) The REST server bound port, if any
    */
  def startRpcEnvAndEndpoint(
                              host: String,
                              port: Int,
                              conf: SparkConf): RpcEnv = {
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, securityMgr, conf))
    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
    rpcEnv
  }

}
